feat: RPC fallback for missing DAG definitions on workers#2271
Conversation
📝 WalkthroughWalkthroughThis PR implements remote DAG definition fetching as a fallback for workers executing sub-DAGs. Workers now query the coordinator for missing DAG YAML via RPC when local store lookups fail, enabling graceful sub-DAG execution in git-synced environments where DAG definitions may not yet be present locally. ChangesRemote DAG Fallback Flow
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
329b0ee to
ffb7e07
Compare
Adds GetDAG RPC to coordinator service allowing workers to fetch missing DAG definitions on demand when the local DAG store misses. This eliminates race conditions in git-synced setups where new DAG YAML may not yet be present on the worker. - New gRPC: GetDAG RPC in coordinator proto (GetDAGRequest/GetDAGResponse) - Coordinator: dagsStore field + GetDAG handler (returns YAML spec by name) - Client: GetDAG(ctx, name) method on coordinator.Client - Agent: RemoteDAGLoader func type + option, wired through dbClient - dbClient: local-first lookup with RPC fallback on miss - Worker: RemoteDAGLoader closure using coordinator client GetDAG - Context: DAGStore field added, populated at init, passed to coordinator
ffb7e07 to
87f00ae
Compare
|
Has been running without regression for a few days now, I think it is safe to merge this. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
internal/runtime/agent/dbclient_test.go (1)
51-51: ⚡ Quick winAdd focused tests for new
GetDAGfallback branches.These updates only adapt constructor calls, but the new
GetDAGpath now has important branches (local hit, local miss + remote hit, remote nil, remote error, nil local store). A table-driven test for those cases would lock down behavior and catch regressions quickly.Also applies to: 80-80, 109-109, 136-136, 156-156
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/agent/dbclient_test.go` at line 51, Add table-driven unit tests covering the new GetDAG fallback branches for dbClient created via newDBClient (the cases: local hit, local miss + remote hit, remote returns nil, remote returns error, and nil local store). For each case, construct mockDAGStore and mockDAGRunStore behaviors and assertions verifying which store was called and what GetDAG returns; use a looped test table to assert expected results and error conditions for dbClient.GetDAG to prevent regressions across the constructor-call updates referenced around newDBClient and GetDAG.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/runtime/agent/dbclient.go`:
- Around line 32-57: Guard against a nil DAG store and only fall back to the
remote loader for true "not found" local misses: first check if o.ds == nil and
treat that as a local-miss case (log and invoke o.remoteDAGLoader if present),
otherwise call o.ds.GetDetails(ctx, name) and if it returns nil return the dag;
if it returns an error, do NOT blanket-fallback — if the error is a not-found
error (use errors.Is(err, <store-not-found-error>), e.g., dagstore.ErrNotFound
or the package-specific not-found sentinel) then attempt o.remoteDAGLoader (if
o.remoteDAGLoader == nil return the original err), but for any other local error
return it immediately; ensure you still log successful remote loads and remote
loader failures using logger and tag.DAG(name)/tag.Error(remoteErr).
In `@internal/service/coordinator/client.go`:
- Around line 1021-1037: The callback passed to attemptCall returns nil on
transport success even when the coordinator response contains a logical error in
resp.Error, preventing retries; modify the anonymous callback used with
attemptCall (the one calling client.client.GetDAG in client.go) to check resp
after the RPC: if resp is nil or resp.Error != "" return a non-nil error
(include resp.Error in the returned error) instead of nil so attemptCall sees
the failure and continues to other members; keep the existing transport error
handling for callErr unchanged.
---
Nitpick comments:
In `@internal/runtime/agent/dbclient_test.go`:
- Line 51: Add table-driven unit tests covering the new GetDAG fallback branches
for dbClient created via newDBClient (the cases: local hit, local miss + remote
hit, remote returns nil, remote returns error, and nil local store). For each
case, construct mockDAGStore and mockDAGRunStore behaviors and assertions
verifying which store was called and what GetDAG returns; use a looped test
table to assert expected results and error conditions for dbClient.GetDAG to
prevent regressions across the constructor-call updates referenced around
newDBClient and GetDAG.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 26413624-c695-4f37-8995-ec7dbf15f667
⛔ Files ignored due to path filters (3)
proto/coordinator/v1/coordinator.pb.gois excluded by!**/*.pb.goproto/coordinator/v1/coordinator_grpc.pb.gois excluded by!**/*.pb.goproto/coordinator/v1/coordinator_protoopaque.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (14)
internal/cmd/context.gointernal/cmd/coord.gointernal/cmd/startall.gointernal/runtime/agent/agent.gointernal/runtime/agent/dbclient.gointernal/runtime/agent/dbclient_test.gointernal/service/coordinator/client.gointernal/service/coordinator/handler.gointernal/service/frontend/api/v1/workers_internal_test.gointernal/service/worker/coordreport/status_pusher_test.gointernal/service/worker/poller_test.gointernal/service/worker/remote_handler.gointernal/service/worker/remote_handler_test.goproto/coordinator/v1/coordinator.proto
There was a problem hiding this comment.
1 issue found across 17 files
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
yohamta0
left a comment
There was a problem hiding this comment.
LGTM 🚀🚀🚀 Thank you very much for implementing this!
Closes #2259
Problem
When a worker executes a sub-DAG via
dag.runand the DAG YAML is not yet present in its local store, the step fails. This creates a race condition in git-synced setups:Solution
Workers fetch missing DAG definitions from the coordinator on demand via a new
GetDAGRPC as a fallback when the local store misses.Changes
GetDAGRPC in coordinator proto withGetDAGRequest/GetDAGResponsedagStorefield +GetDAGimplementation (returns YAML spec by name, returnsUnimplementedwhen dagStore is nil)GetDAG(ctx, name)method added toClientinterface + implementationdbClient:RemoteDAGLoaderfunction type wired through agent options →dbClient.GetDAG()tries local-first, then RPC fallback on missremote_handler: closure using coordinator clientGetDAG+spec.LoadYAMLto parse remote YAMLcmd/coord.go+context.go:DAGStorewired into coordinator startupData Flow
Testing
go build ./...— cleango vet ./...— clean (no new issues)agent,coordinator,worker,cmd,frontend/api/v1Notes
dagStoreon coordinator is optional (returnsUnimplementedif nil)RemoteDAGLoaderdefaults to nil → no fallback attempted on workers without a coordinator clientSummary by cubic
Adds an RPC fallback so workers can fetch missing DAG YAML from the coordinator when local DAGs aren’t synced yet, preventing sub-DAG failures. Implements a local-first lookup that only falls back on not-found and works even when a worker has no local
DAGStore(Linear #2259).New Features
GetDAGto the coordinator (GetDAGRequest/GetDAGResponse) to serve DAG YAML (Unimplemented if no store).GetDAG; workers try local-first and fall back via agentRemoteDAGLoaderinremote_handlerusingspec.LoadYAML.DAGStoreinto command context and coordinator startup (startall) so RPCs can read DAG definitions.Bug Fixes
DAGStoreand restricted fallback toexec.ErrDAGNotFound; on remote failure or empty, return the original local error; tests added.chatbridgebatcher tests by adding a synchronous bucket flush and using it in tests.Written for commit 033ebb9. Summary will update on new commits.
Summary by CodeRabbit